Operacja APPEND nie powiodła się z HTTP 500?


package org.apache.spark.examples.kafkaToflink;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import com.microsoft.azure.datalake.store.ADLException;
import com.microsoft.azure.datalake.store.ADLFileOutputStream;
import com.microsoft.azure.datalake.store.ADLStoreClient;
import com.microsoft.azure.datalake.store.IfExists;
import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;import scala.util.parsing.combinator.testing.Str;public class App { public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.72:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource( new FlinkKafkaConsumer010<String>("tenant", new SimpleStringSchema(), properties), "Kafka_Source"); stream.addSink(new ADLSink()).name("Custom_Sink").setParallelism(128);
env.execute("App");
}
}class ADLSink<String> extends RichSinkFunction<String> { private java.lang.String clientId = "***********";
private java.lang.String authTokenEndpoint = "***************";
private java.lang.String clientKey = "*****************";
private java.lang.String accountFQDN = "****************";
private java.lang.String filename = "/Bitfinex/ETHBTC/ORDERBOOK/ORDERBOOK.json"; @Override
public void invoke(String value) { AccessTokenProvider provider = new ClientCredsTokenProvider(authTokenEndpoint, clientId, clientKey);
ADLStoreClient client = ADLStoreClient.createClient(accountFQDN, provider);
try {
client.setPermission(filename, "744");
ADLFileOutputStream stream = client.getAppendStream(filename); System.out.println(value);
stream.write(value.toString().getBytes()); stream.close(); } catch (ADLException e) { System.out.println(e.requestId);
} catch (Exception e) { System.out.println(e.getMessage());
System.out.println(e.getCause());
} }}

Ciągle próbuję dodać plik, który znajduje się w Azure Data Lake Warehouse przy użyciu pętli while. Ale czasami daje to: Operacja APPEND nie powiodła się z HTTP500, błąd podczas uruchamiania lub czasami po 10 minutach. Używam java
Zaproszony:

Aby odpowiedzieć na pytania, Zaloguj się lub Zarejestruj się